import pandas as pd
from pyspark.sql import *
from plotly.graph_objs import *
from plotly.offline import download_plotlyjs, init_notebook_mode, iplot
init_notebook_mode()
import plotly.graph_objs as go
import datetime
import numpy as np
import warnings
warnings.filterwarnings('ignore')
import os
import sys
module_path = os.path.abspath(os.path.join('../../instacart-ml'))
if module_path not in sys.path:
sys.path.append(module_path)
import common_utility.ModelEvaluation as me
import common_utility.PlotlyObject as plt
from glob import glob
import pyspark.sql.functions as F
def read_spark_csv(spark, path):
df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
return df
spark = SparkSession.builder \
.appName("My Spark Application")\
.config("spark.master", "local[*]")\
.config("spark.driver.memory", "10g")\
.config("spark.executor.memory", "30g")\
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger = spark._jvm.org.apache.log4j
logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/11/17 21:11:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
data_list = glob("/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/data/*")
product = read_spark_csv(spark, data_list[0])
order = read_spark_csv(spark, data_list[1])
order_products_train = read_spark_csv(spark, data_list[2])
departments = read_spark_csv(spark, data_list[3])
aisles = read_spark_csv(spark, data_list[4])
order_products_prior = read_spark_csv(spark, data_list[5])
#### Cohort 1 ####
# for every user, collect all previous purchase product_id as # of row in training set
order_schema = ['order_id', 'user_id', 'eval_set', 'order_dow', 'order_hour_of_day', 'days_since_prior_order']
train_user = order.filter(F.col("eval_set") == 'train').select("user_id").distinct()
order_product = order_products_prior\
.unionByName(order_products_train)\
.join(order.select(*order_schema), ['order_id'], 'inner')\
.join(train_user, ['user_id'], 'inner')
cohort_df = order_product.filter(F.col("eval_set") == "prior").groupBy('user_id')\
.agg(F.collect_set("product_id").alias("product_id"))\
.withColumn("product_id", F.explode("product_id"))
cohort_schema = ["order_id", "product_id", 'user_id']
feature_schema = ['user_id', 'order_id', 'order_dow', 'order_hour_of_day', 'days_since_prior_order']
train_df = order_product.filter(F.col("eval_set") == 'train').select(*cohort_schema)\
.join(cohort_df, ['user_id', 'product_id'], 'right')\
.withColumn("reordered", F.when(F.col("order_id").isNotNull(), 1).otherwise(0)).drop("order_id")
output_df = order_product.filter(F.col("eval_set") == 'train') \
.dropDuplicates(["user_id"]).select(*feature_schema) \
.join(train_df, ['user_id'], 'inner')\
.join(product.select("product_id", 'department_id'), ['product_id'], 'inner')
output_path = "/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/train_df.parquet"
output_df.write.mode("overwrite").parquet(output_path)
# Product level -> Order Level; User_id, Product_id, Reordered
df = pd.read_parquet(output_path)
df.columns
Index(['product_id', 'user_id', 'order_id', 'order_dow', 'order_hour_of_day',
'days_since_prior_order', 'reordered', 'department_id'],
dtype='object')
#### Cohort 2 ####
df = spark.read.parquet(output_path)
user_order = order.filter(F.col("eval_set") == 'prior')\
.groupBy(['user_id']).agg(F.count_distinct("order_id").alias("user_order_num"))
# aisle id & reorder rate per product for each user
reorder_freq = order_products_prior\
.join(order.select("user_id", "order_id"), ["order_id"], "left")\
.groupBy(['user_id', "product_id"]).agg(F.count("order_id").alias("num_order"))\
.join(user_order, ["user_id"], "left")\
.withColumn("reorder_rate", F.round(F.col("num_order") / F.col("user_order_num"),2))\
.join(product.select("product_id", "aisle_id"), ["product_id"], "inner")
temp = order.groupBy("user_id").agg(F.mean("days_since_prior_order").alias("mean_day"),
F.stddev("days_since_prior_order").alias("std_day"))
# sacle since priror order for each product (based on each user)
scale_day_prior = order\
.filter(F.col("eval_set") == "train")\
.join(temp, ["user_id"], "inner")\
.withColumn("scale_day_prior", (F.col("days_since_prior_order") - F.col("mean_day")) / F.col("std_day"))\
.select("user_id", "scale_day_prior")
output_df2 = df.join(reorder_freq.select("product_id", "user_id", "reorder_rate", "aisle_id"), ["user_id", "product_id"], "left")\
.join(scale_day_prior, ['user_id'], "left")\
.drop("days_since_prior_order")
output_path2 = "/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/train_df2.parquet"
output_df2.write.mode("overwrite").parquet(output_path2)
df2 = pd.read_parquet(output_path2)
df2.columns
Index(['user_id', 'product_id', 'order_id', 'order_dow', 'order_hour_of_day',
'reordered', 'department_id', 'reorder_rate', 'aisle_id',
'scale_day_prior'],
dtype='object')
# Feature Engineering
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder(sparse=False)
encoder_list = ['order_dow', 'order_hour_of_day', 'department_id']
encoder.fit(df[encoder_list])
encoded_data = encoder.transform(df[encoder_list])
columns = encoder.get_feature_names_out(encoder_list)
one_hot_encoded_df = pd.DataFrame(encoded_data, columns=columns)
df = pd.concat([df, one_hot_encoded_df], axis=1)
# train_test_split
test_ratio = 0.2
n_id = df['user_id'].nunique()
test_id = df['user_id'].drop_duplicates().sample(int(n_id *test_ratio)).tolist()
train_df = df[~df['user_id'].isin(test_id)].reset_index(drop=True)
test_df = df[df['user_id'].isin(test_id)].reset_index(drop=True)
positive_rate = train_df[train_df['reordered'] == 0].shape[0] / train_df[
train_df['reordered'] == 1].shape[0] # ratio for imbalance data
input_var_list = columns.tolist() + ['days_since_prior_order']
label = 'reordered'
train_x = train_df[input_var_list]
test_x = test_df[input_var_list]
train_y = train_df[label]
test_y = test_df[label]
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
logreg = LogisticRegression(class_weight='balanced', max_iter=100, n_jobs=-1, verbose=0)
logreg.fit(train_x, train_y)
test_df['log_prob'] = logreg.predict_proba(test_x)[:,1]
test_df['log_pred'] = test_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)
train_df['log_prob'] = logreg.predict_proba(train_x)[:,1]
train_df['log_pred'] = train_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)
/Users/karenwang/anaconda3/envs/mcgill_ml/lib/python3.11/site-packages/sklearn/linear_model/_logistic.py:458: ConvergenceWarning: lbfgs failed to converge (status=1):
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.
Increase the number of iterations (max_iter) or scale the data as shown in:
https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
n_iter_i = _check_optimize_result(
import xgboost as xgb
clf = xgb.XGBClassifier(n_estimators=300, max_depth=6, n_jobs=-1, scale_pos_weight=positive_rate)
clf.fit(train_x, train_y)
# Predict the test set results
test_df['xgb_prob'] = clf.predict_proba(test_x)[:,1]
test_df['xgb_pred'] = test_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)
train_df['xgb_prob'] = clf.predict_proba(train_x)[:,1]
train_df['xgb_pred'] = train_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)
trace1 = me.create_roc_trace(train_df, label, 'log_prob', 'train_logistic')
trace2 = me.create_roc_trace(test_df, label, 'log_prob', 'test_logistic')
trace3 = me.create_roc_trace(train_df, label, 'xgb_prob', 'train_xgb')
trace4 = me.create_roc_trace(test_df, label, 'xgb_prob', 'test_xgb')
data = [trace1, trace2, trace3, trace4]
me.create_overlay_roc_curve(data)